// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.
#include <util/util.h>
#include <db/merge_helper.h>
#include "db/range_del_aggregator.h"
#include "memory_arena.h"
#include "rocksdb/merge_operator.h"

namespace rocksdb{

// liliupeng@inspur.com 内存存储区
MemArena::MemArena(const InternalKeyComparator& mkeyCmp, const ImmutableCFOptions& ioptions)
    : IMemoryArena(mkeyCmp, ioptions),
      mkey_cmp_(mkeyCmp),
      moptions_(ioptions), refs_(0),
       locks_(moptions_.inplace_update_support ? 1000 : 0){
  int64_t current_time = 0;
  oldest_key_time_ = static_cast<uint64_t >(current_time);
}

MemArena::~MemArena() {
}

const char *MemArena::GenerateNode(bool encode_version_node, const Slice &key,
                                   const Slice &value, size_t *encoded_len) {
  VersionNode* node = nullptr;
  size_t key_size = key.size();
  size_t val_size = value.size();
  *encoded_len = VarintLength(key_size) + key_size +
                 VarintLength(val_size) + val_size;
  if(encode_version_node){
    char* buf = new char[*encoded_len];
    VersionNode::Encode(key, value, buf);
    node = new EncodedVersionNode();
    node->SetNext(nullptr);
    node->SetPrev(nullptr);
    bool ok = ((EncodedVersionNode*)node)->CASSetKey(nullptr, buf);
    assert(ok);
  } else{
    MvccKey mk;
    mk.parseKey(key);
    uint64_t seq;
    ValueType t;
    UnPackSequenceAndType(mk.seqNumAndType_, &seq, &t);
    node = new UncodedVersionNode(mk.userKeyWithTime_, value, seq, t);
  }

  return reinterpret_cast<const char *>(node);
}


//key为internal_key,即包含userkey,seq,type
bool MemArena::RangeDeletionInsert(const Slice& key, const Slice& value){

  size_t encode_len;
  auto node = GenerateNode(moptions_.encode_version_node, key, value, &encode_len);
  bool res = range_del_list_.Insert(node);
  if(!res){
    if(moptions_.encode_version_node){
      delete[] ((EncodedVersionNode*)node)->Key();
    }
    delete node;
  }
  return res;
}

//key为internal_key,即包含userkey,seq,type
bool MemArena::KVDataInsert(const Slice& key, const Slice& value){
  size_t encoded_len = 0;
  auto node = GenerateNode(moptions_.encode_version_node, key, value, &encoded_len);
  bool res = kv_list_.Insert(node);
  if(res){
    kv_insert_num_.fetch_add(encoded_len);
  } else{
    if(moptions_.encode_version_node){
      delete[] ((EncodedVersionNode*)node)->Key();
    }
    delete node;
  }

  return res;
}


//key为internal_key,即包含userkey,seq,type
bool MemArena::Insert(const Slice& key, const Slice& value) {
  size_t encoded_len = 0;
  auto node =
      GenerateNode(moptions_.encode_version_node, key, value, &encoded_len);
  bool res = range_del_list_.Insert(node);
  if(res){
    kv_insert_num_.fetch_add(encoded_len);
  } else{
    if(moptions_.encode_version_node){
      delete[] ((EncodedVersionNode*)node)->Key();
    }
    delete node;
  }
  return res;
}

//需要判断返回的是不是nullptr,如果是，说明当前key在范围删除内
//key为memtable_key
ArtTree::Iterator* MemArena::Seek(const Slice& key){
  auto iter = this->RangeDeletionSeek(key);
  if(iter != nullptr && iter->Valid()){
    return nullptr;
  }
   iter = kv_list_.GetIterator();
   iter->HashSeek(key);
  return iter;
}

//如果返回的是nullptr，说明key不在范围删除内
//如果是iter，在进行判断Valid()
ArtTree::Iterator* MemArena::RangeDeletionSeek(const Slice& key){
  auto iter = range_del_list_.GetIterator();
  iter->HashSeek(key);
  if(iter->Valid()){
    uint64_t iter_seq, seq;
    ValueType iter_t, t;
    MvccKey mk, iter_mk;
    mk.parseKey(key);
    if(moptions_.encode_version_node){
      Slice buf = GetLengthPrefixedSlice(iter->key());
      iter_mk.parseKey(buf);
    } else {
      auto node = static_cast<UncodedVersionNode *>(iter->node());
      iter_mk = node->GetMvccKey();
    }
    Slice start, end;
    start = iter_mk.userKey_;

    //当前key在前一个范围删除的范围内
    if(mk.userKey_.compare(start) < 0){
      iter->Prev();
      if(iter->Valid()){
        AnalysisNode(iter_mk, iter, &iter_seq, &iter_t, end);
        UnPackSequenceAndType(mk.seqNumAndType_, &seq, &t);

        if(mk.userKey_.compare(end) < 0 && seq <= iter_seq)
          return iter;
        return nullptr;
      }
      return iter;
    }
    UnPackSequenceAndType(mk.seqNumAndType_, &seq, &t);

    AnalysisNode(iter_mk, iter, &iter_seq, &iter_t, end);
    start = iter_mk.userKey_;
    if(mk.userKey_.compare(start) == 0 && seq > iter_seq)
      return nullptr;
  }
  return iter;
}

void MemArena::AnalysisNode( MvccKey &mk, ArtTree::Iterator *iter,
                            uint64_t *iter_seq, ValueType *iter_t,
                            Slice &end) {
  if(moptions_.encode_version_node){
    Slice buf = GetLengthPrefixedSlice(iter->key());
    end = GetLengthPrefixedSlice(buf.data() + buf.size());
    mk.parseKey(buf);
    UnPackSequenceAndType(mk.seqNumAndType_, iter_seq, iter_t);
  } else {
    auto node = dynamic_cast<UncodedVersionNode *>(iter->node());
    end = node->GetValue();
    *iter_seq = node->GetSequenceNumber();
  }
}

void MemArena::Ref() {
    ++refs_;
}

bool MemArena::Unref() {
    assert(refs_ >= 1);
    --refs_;
    if (refs_ == 0) {
        delete this;
        return true;
    }
    return false;
}

int MemArena::KeyComparator::operator()(const char *prefix_len_key1, const char *prefix_len_key2) const {
  // Internal keys are encoded as length-prefixed strings.
  Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
  Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
  return comparator.CompareKeySeq(k1, k2);
}

int MemArena::KeyComparator::operator()(const char *prefix_len_key,
                                        const MemTableRep::KeyComparator::DecodedType &key) const {
  // Internal keys are encoded as length-prefixed strings.
  Slice a = GetLengthPrefixedSlice(prefix_len_key);
  return comparator.CompareKeySeq(a, key);
}

int MemArena::KeyComparator::operator()(const Slice &key_a,
    SequenceNumber seq, const Slice &key_b) const {
  return comparator.CompareKeySeq(key_a, seq, key_b);
}

int MemArena::KeyComparator::operator()(const Slice &key1, const Slice &key2,
                                        SequenceNumber seq1,
                                        SequenceNumber seq2) const {
  return comparator.CompareKeySeq(key1, key2, seq1, seq2);
}

int MemArena::KeyComparator::operator()(const Slice &key, SequenceNumber seq,
                                        const char *prefix_len_key) const {
  Slice a = GetLengthPrefixedSlice(prefix_len_key);
  return comparator.CompareKeySeq(key, seq, a);
}

// Callback from MemTable::Get()
namespace {

struct Saver {
    Status* status;
    const LookupKey* key;
    bool* found_final_value;  // Is value set correctly? Used by KeyMayExist
    bool* merge_in_progress;
    std::string* value;
    SequenceNumber seq;
    const MergeOperator* merge_operator;
    // the merge operations encountered;
    MergeContext* merge_context;
    SequenceNumber max_covering_tombstone_seq;
    MemArena* mem_arena;
    Logger* logger;
    Statistics* statistics;
    bool inplace_update_support;
    Env* env_;
    ReadCallback* callback_;
    bool* is_blob_index;

    bool CheckCallback(SequenceNumber _seq) {
        if (callback_) {
            return callback_->IsVisible(_seq);
        }
        return true;
    }
};
}  // namespace

static bool SaveValue(void* arg, const char* entry, VersionNode* node) {   /////加上锁
    Saver* s = reinterpret_cast<Saver*>(arg);
    assert(s != nullptr);
    MergeContext* merge_context = s->merge_context;
    SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
    const MergeOperator* merge_operator = s->merge_operator;

    assert(merge_context != nullptr);

    // entry format is:
    //    klength  varint32
    //    userkey  char[klength-8]
    //    tag      uint64
    //    vlength  varint32
    //    value    char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
    VersionNode::NodeType node_type = node->GetNodeType();
    Slice key;
    uint32_t key_length = 0;
    const char *key_ptr = nullptr;

    if(node_type == VersionNode::NodeType::UNCODED){
      key = ((UncodedVersionNode*)node)->GetKey();
      key_length = key.size() + 8;
    } else {
      key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
      key = Slice(key_ptr, key_length - 8);
    }
    if (s->mem_arena->GetInternalKeyComparator().user_comparator()->Equal(
            key, s->key->user_key())) {
        // Correct user key
      ValueType type;
      SequenceNumber seq;
      if(node_type == VersionNode::NodeType::UNCODED){
        type = ((UncodedVersionNode*)node)->GetValueType();
        seq = ((UncodedVersionNode*)node)->GetSequenceNumber();
      } else {
        const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
        UnPackSequenceAndType(tag, &seq, &type);
      }

      // If the value is not in the snapshot, skip it
        if (!s->CheckCallback(seq)) {
            return true;  // to continue to the next seq
        }

        s->seq = seq;

        if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
            max_covering_tombstone_seq > seq) {
            type = kTypeRangeDeletion;
        }
        switch (type) {
            case kTypeBlobIndex:
                if (s->is_blob_index == nullptr) {
                    ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index.");
                    *(s->status) = Status::NotSupported(
                            "Encounter unsupported blob value. Please open DB with "
                            "rocksdb::blob_db::BlobDB instead.");
                } else if (*(s->merge_in_progress)) {
                    *(s->status) =
                            Status::NotSupported("Blob DB does not support merge operator.");
                }
                if (!s->status->ok()) {
                    *(s->found_final_value) = true;
                    return false;
                }
                FALLTHROUGH_INTENDED;
            case kTypeValue: {
                if (s->inplace_update_support) {
                    s->mem_arena->GetLock(s->key->user_key())->ReadLock();
                }
              Slice v;
              if(node_type == VersionNode::NodeType::UNCODED){
                  v = ((UncodedVersionNode*)node)->GetValue();
                } else{
                  v = GetLengthPrefixedSlice(key_ptr + key_length);
                }
                *(s->status) = Status::OK();
                if (*(s->merge_in_progress)) {
                    if (s->value != nullptr) {
                        *(s->status) = MergeHelper::TimedFullMerge(
                                merge_operator, s->key->user_key(), &v,
                                merge_context->GetOperands(), s->value, s->logger,
                                s->statistics, s->env_, nullptr /* result_operand */, true);
                    }
                } else if (s->value != nullptr) {
                    s->value->assign(v.data(), v.size());
                }
                if (s->inplace_update_support) {
                    s->mem_arena->GetLock(s->key->user_key())->ReadUnlock();
                }
                *(s->found_final_value) = true;
                if (s->is_blob_index != nullptr) {
                    *(s->is_blob_index) = (type == kTypeBlobIndex);
                }
                return false;
            }
            case kTypeDeletion:
            case kTypeSingleDeletion:
            case kTypeRangeDeletion: {
                if (*(s->merge_in_progress)) {
                    if (s->value != nullptr) {
                        *(s->status) = MergeHelper::TimedFullMerge(
                                merge_operator, s->key->user_key(), nullptr,
                                merge_context->GetOperands(), s->value, s->logger,
                                s->statistics, s->env_, nullptr /* result_operand */, true);
                    }
                } else {
                    *(s->status) = Status::NotFound();
                }
                *(s->found_final_value) = true;
                return false;
            }
            case kTypeMerge: {
                if (!merge_operator) {
                    *(s->status) = Status::InvalidArgument(
                            "merge_operator is not properly initialized.");
                    // Normally we continue the loop (return true) when we see a merge
                    // operand.  But in case of an error, we should stop the loop
                    // immediately and pretend we have found the value to stop further
                    // seek.  Otherwise, the later call will override this error status.
                    *(s->found_final_value) = true;
                    return false;
                }
                Slice v;
                if(node_type == VersionNode::NodeType::UNCODED){
                  v = ((UncodedVersionNode*)node)->GetValue();
                } else{
                  v = GetLengthPrefixedSlice(key_ptr + key_length);
                }
                *(s->merge_in_progress) = true;
                merge_context->PushOperand(
                        v, s->inplace_update_support == false /* operand_pinned */);
                if (merge_operator->ShouldMerge(
                        merge_context->GetOperandsDirectionBackward())) {
                    *(s->status) = MergeHelper::TimedFullMerge(
                            merge_operator, s->key->user_key(), nullptr,
                            merge_context->GetOperands(), s->value, s->logger, s->statistics,
                            s->env_, nullptr /* result_operand */, true);
                    *(s->found_final_value) = true;
                    return false;
                }
                return true;
            }
            default:
                assert(false);
                return true;
        }
    }

    // s->state could be Corrupt, merge or notfound
    return false;
}

bool MemArena::Get(const LookupKey& key, std::string* value, Status* s,
                   MergeContext* merge_context,
                   SequenceNumber* max_covering_tombstone_seq,
                   SequenceNumber* seq, const ReadOptions& read_opts,
                   ReadCallback* callback, bool* is_blob_index) {
    // The sequence number is updated synchronously in version_set.h
    if (GetKVInsertNum()) {
        // Avoiding recording stats for speed.
        return false;
    }
    PERF_TIMER_GUARD(get_from_memtable_time);

    std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
            NewRangeTombstoneIterator(*seq,
                                      this->mkey_cmp_.comparator));
    if (range_del_iter != nullptr) {
        *max_covering_tombstone_seq =
                std::max(*max_covering_tombstone_seq,
                         range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
    }

    bool found_final_value = false;
    bool merge_in_progress = s->IsMergeInProgress();
    Saver saver;
    saver.status = s;
    saver.found_final_value = &found_final_value;
    saver.merge_in_progress = &merge_in_progress;
    saver.key = &key;
    saver.value = value;
    saver.seq = kMaxSequenceNumber;
    saver.mem_arena = this;
    saver.merge_context = merge_context;
    saver.max_covering_tombstone_seq = *max_covering_tombstone_seq;
    saver.merge_operator = moptions_.merge_operator;
    saver.logger = moptions_.info_log;
    saver.inplace_update_support = moptions_.inplace_update_support;
    saver.statistics = moptions_.statistics;
    saver.env_ = moptions_.env;
    saver.callback_ = callback;
    saver.is_blob_index = is_blob_index;
    Get(key, &saver, SaveValue);

    *seq = saver.seq;

    // No change to value, since we have not yet found a Put/Delete
    if (!found_final_value && merge_in_progress) {
        *s = Status::MergeInProgress();
    }
    PERF_COUNTER_ADD(get_from_memtable_count, 1);
    return found_final_value;
}


    Status MemArena::AddRangeTombstoneIterators(const ReadOptions &read_opts, RangeDelAggregator *range_del_agg) {
        assert(range_del_agg != nullptr);
        // Except for snapshot read, using kMaxSequenceNumber is OK .
        SequenceNumber read_seq = read_opts.snapshot != nullptr
                                  ? read_opts.snapshot->GetSequenceNumber()
                                  : kMaxSequenceNumber;
            std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
                    this->NewRangeTombstoneIterator(read_seq, mkey_cmp_.comparator));
            range_del_agg->AddTombstones(std::move(range_del_iter));
        return Status::OK();
    }

    port::RWMutex *MemArena::GetLock(const Slice &key) {
        return &locks_[static_cast<size_t>(GetSliceNPHash64(key)) % locks_.size()];
    }

}
